#!/usr/bin/perl -w
#
# Copyright (c) 2009 Michael Schroeder, Novell Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see the file COPYING); if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
################################################################
#
# Check if all jobs in state building are really built on the workers
#

BEGIN {
  my ($wd) = $0 =~ m-(.*)/- ;
  $wd ||= '.';
  unshift @INC,  "$wd/build";
  unshift @INC,  "$wd";
}

use POSIX;
use Data::Dumper;
use Digest::MD5 ();
use Fcntl qw(:DEFAULT :flock);
use XML::Structured ':bytes';
use Getopt::Long ();

use BSConfiguration;
use BSRPC ':https';
use BSUtil;
use BSXML;
use BSRedisnotify;

use strict;

my $bsdir = $BSConfig::bsdir || "/srv/obs";
my $rundir = $BSConfig::rundir || "$BSConfig::bsdir/run";
my $workersdir = "$BSConfig::bsdir/workers";
my $jobsdir = "$BSConfig::bsdir/jobs";

my $runname = 'bs_warden';

sub parse_options {
  my %opts;
  if (!Getopt::Long::GetOptionsFromArray(\@_, \%opts,
    'stop|exit',
    'restart',
    'logfile=s',
  )) {
    print_usage();
    die("Invalid option(s)\n");
  }
  return \%opts;
}

sub print_usage {
  $0 =~ /([^\/]+$)/;
  print "Usage: $1 [options]

Options:
  --stop|--exit          - graceful shutdown daemon
  --restart              - restart daemon
  --logfile file         - redirect output to logfile

";
}

sub check_exitrestart {
  if (-e "$rundir/$runname.exit") {
    close(RUNLOCK);
    unlink("$rundir/$runname.exit");
    BSUtil::printlog("exiting...");
    exit(0);
  }
  if (-e "$rundir/$runname.restart") {
    close(RUNLOCK);
    unlink("$rundir/$runname.restart");
    BSUtil::printlog("restarting...");
    exec($0, @ARGV);
    die("$0: $!\n");
  }
}

sub updateredisjobstatus {
  my ($arch, $job, $info, $details) = @_;
  return unless $BSConfig::redisserver;
  $info ||= readxml("$jobsdir/$arch/$job", $BSXML::buildinfo, 1);
  BSRedisnotify::updatejobstatus("$info->{'project'}/$info->{'repository'}/$info->{'arch'}", $job, $details) if $info;
}

# copy @ARGV to keep it untouched in case of restart
my $options = parse_options(@ARGV);

BSUtil::mkdir_p_chown($bsdir, $BSConfig::bsuser, $BSConfig::bsgroup) || die("unable to create $bsdir\n");
# Open logfile if requested
BSUtil::openlog($options->{'logfile'}, $BSConfig::logdir, $BSConfig::bsuser, $BSConfig::bsgroup);
BSUtil::drop_privs_to($BSConfig::bsuser, $BSConfig::bsgroup);

$| = 1;
$SIG{'PIPE'} = 'IGNORE';
BSUtil::restartexit($options, 'warden', "$rundir/$runname");

# get lock
mkdir_p($rundir);
open(RUNLOCK, '>>', "$rundir/$runname.lock") || die("$rundir/$runname.lock: $!\n");
flock(RUNLOCK, LOCK_EX | LOCK_NB) || die("worker warden is already running!\n");
utime undef, undef, "$rundir/$runname.lock";

BSUtil::printlog("starting build service worker warden");

my %building;
my $nextorphan = 0;
my $xcheck = 0;
my %orphanchecks;

while (1) {
  my $now = time();
  my %nbuilding;
  for my $wname (ls("$workersdir/building")) {
    next if $wname =~ /^\./;
    $nbuilding{$wname} = $building{$wname} || {'lastcheck' => $now};
  }
  %building = %nbuilding;
  %nbuilding = ();
  for my $wname (sort keys %building) {
    my $b = $building{$wname};
    my $lastcheck = $b->{'lastcheck'};
    $lastcheck += rand(60 * 60);
    next if $lastcheck > $now;
    last if -e "$rundir/$runname.restart";
    last if -e "$rundir/$runname.exit";
    my $worker = readxml("$workersdir/building/$wname", $BSXML::worker, 1);
    next unless $worker && $worker->{'job'} && $worker->{'arch'};
    my $job = $worker->{'job'};
    my $arch = $worker->{'arch'};
    $building{$wname}->{'job'} = "$arch/$job";
    my $js;
    if ($worker->{'reposerver'}) {
      # masterdispatched job. ask slave about job status.
      my $param = {
	'uri' => "$worker->{'reposerver'}/jobs/$arch/$job",
	'timeout' => 60,
      };
      eval {
	$js = BSRPC::rpc($param, $BSXML::jobstatus, 'view=status');
      };
    } else {
      $js = readxml("$jobsdir/$arch/$job:status", $BSXML::jobstatus, 1);
    }
    next unless $js && $js->{'code'} eq 'building';
    next unless $js->{'workerid'} eq $worker->{'workerid'};
    #print "checking worker $wname\n";
    my $param = {
      'uri' => "$js->{'uri'}/worker",
      'timeout' => 60,
    };
    eval {
      BSRPC::rpc($param, undef, "jobid=$js->{'jobid'}");
    };
    if ($@) {
      warn($@);
      # worker is down or doing something weird.
      if ($worker->{'reposerver'}) {
	BSUtil::printlog("restarting build of $arch/$job building on $js->{'workerid'}");
	my $param = {
	  'uri' => "$worker->{'reposerver'}/jobs/$arch/$job",
	  'request' => 'POST',
	  'timeout' => 60,
	};
	eval {
	  BSRPC::rpc($param, undef, 'cmd=idleworker', "workerid=$worker->{'workerid'}", "jobid=$js->{'jobid'}");
	  mkdir_p("$workersdir/down");
	  rename("$workersdir/building/$wname", "$workersdir/down/$wname");
	  delete $building{$wname};
	};
	warn($@) if $@;
      } else {
	local *F;
	my $js2 = BSUtil::lockopenxml(\*F, '<', "$jobsdir/$arch/$job:status", $BSXML::jobstatus, 1);
	if (!$js2 || $js2->{'code'} ne 'building' || $js2->{'jobid'} ne $js->{'jobid'} || $js2->{'workerid'} ne $js->{'workerid'}) {
	  print "build of $job is done on a different worker\n";
	  close F;
	  next;
	}
	BSUtil::printlog("restarting build of $arch/$job building on $js->{'workerid'}");
	updateredisjobstatus($arch, $job);
	unlink("$jobsdir/$arch/$job:status");
	mkdir_p("$workersdir/down");
	rename("$workersdir/building/$wname", "$workersdir/down/$wname");
        delete $building{$wname};
        close F;
      }
    } else {
      $b->{'lastcheck'} = $now;
    }
  }
  if ($now > $nextorphan) {
    $nextorphan = $now + 60;	# every minute
    $xcheck = 0 if $xcheck++ > 10;
    my %buildingjobs = map {($_->{'job'} || '') => 1} values %building;
    for my $arch (sort(ls($jobsdir))) {
      next unless -d "$jobsdir/$arch";
      my @b = sort(grep {!/^\./} ls("$jobsdir/$arch"));
      my %locked = map {$_ => 1} grep {/:status$/} @b;
      # check for orphaned jobs
      my %norphanchecks;
      $orphanchecks{$arch} ||= {};
      for my $job (grep {!/:(?:dir|status|new)$/} @b) {
	next unless $locked{"$job:status"};
	next if $buildingjobs{"$arch/$job"};
	if (!$orphanchecks{$arch}->{$job}) {
	  my @s = stat("$jobsdir/$arch/$job:status");
	  $norphanchecks{$job} = 1 + (((@s ? $s[9] : 0) / 60) % 30);
	} else {
	  $norphanchecks{$job} = $orphanchecks{$arch}->{$job};
	}
	next if $norphanchecks{$job}++ < 30;	# check every 30 minutes
	$norphanchecks{$job} = 1;
	my $js = readxml("$jobsdir/$arch/$job:status", $BSXML::jobstatus, 1);
	if (!$js) {
	  my @s = stat("$jobsdir/$arch/$job:status");
	  if (@s && $s[9] + 3600 < $now) {
	    BSUtil::printlog("removing bad $arch/$job:status file");
	    unlink("$jobsdir/$arch/$job:status");
	  }
	  next;
	}
	if ($js->{'code'} ne 'building') {
	  my @s = stat("$jobsdir/$arch/$job:status");
	  next if !@s || $s[9] + 86400 > $now;
	  if (($js->{'code'} || '') eq 'finished' && !$js->{'endtime'}) {
	    # no endtime, the is probably a fake job we cannot restart. also remove job.
	    BSUtil::printlog("removing stuck fake job $arch/$job");
	    unlink("$jobsdir/$arch/$job");
	    BSUtil::cleandir("$jobsdir/$arch/$job:dir");
	    rmdir("$jobsdir/$arch/$job:dir");
	    updateredisjobstatus($arch, $job);
	    unlink("$jobsdir/$arch/$job:status");
	    next;
	  }
	  BSUtil::printlog("restarting build of $arch/$job stuck in code $js->{'code'}");
	  updateredisjobstatus($arch, $job);
	  unlink("$jobsdir/$arch/$job:status");
	  next;
	}
        next unless $js->{'code'} eq 'building';
	#print "orphan check for $arch/$job...\n";
	my $param = {
	  'uri' => "$js->{'uri'}/worker",
	  'timeout' => 60,
	};
	eval {
	  BSRPC::rpc($param, undef, "jobid=$js->{'jobid'}");
	};
	if ($@) {
	  warn($@);
	  local *F;
	  my $js2 = BSUtil::lockopenxml(\*F, '<', "$jobsdir/$arch/$job:status", $BSXML::jobstatus, 1);
	  if (!$js2 || $js2->{'code'} ne 'building' || $js2->{'jobid'} ne $js->{'jobid'} || $js2->{'workerid'} ne $js->{'workerid'}) {
	    print "build of $job is done on a different worker\n";
	    close F;
	    next;
	  }
	  BSUtil::printlog("restarting orphaned build of $arch/$job building on $js->{'workerid'}");
	  updateredisjobstatus($arch, $job);
	  unlink("$jobsdir/$arch/$job:status");
          close F;
	}
      }
      $orphanchecks{$arch} = \%norphanchecks;
      if (!$xcheck) {
	# check orphaned :dir or :status files
	my %jobs = map {$_ => 1} grep {!/:(?:dir|status|new)$/} @b;
	for my $job (grep {s/:dir$//} @b) {
	  next if $jobs{$job};
	  my @s = stat("$jobsdir/$arch/$job:dir");
	  if (@s && $s[9] + 86400 < $now) {
	    BSUtil::printlog("removing orphaned $arch/$job result directory");
	    BSUtil::cleandir("$jobsdir/$arch/$job:dir");
	    rmdir("$jobsdir/$arch/$job:dir");
	  }
	}
	for my $job (grep {s/:status$//} @b) {
	  next if $jobs{$job};
	  my @s = stat("$jobsdir/$arch/$job:status");
	  if (@s && $s[9] + 86400 < $now) {
	    BSUtil::printlog("removing orphaned $arch/$job status file");
	    unlink("$jobsdir/$arch/$job:status");
	  }
	}
      }
    }
  }

  #print "sleeping\n";
  for my $i (qw{1 2 3 4 5}) {
    check_exitrestart();
    sleep(1);
  }
}
